package top.ply.messageservice.handler;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.RabbitListenerErrorHandler;
import org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import top.ply.message.service.TelephoneMsgService;

import java.util.Map;
import java.util.concurrent.*;

@Component("phoneMsgMQErrorHandler")
public class PhoneMsgMQErrorHandler implements RabbitListenerErrorHandler {

    Logger logger = LoggerFactory.getLogger(PhoneMsgMQErrorHandler.class);

    public static final Integer MAX_RETRY_TIMES = 3;

    @Autowired
    TelephoneMsgService telephoneMsgService;

    private static ThreadPoolExecutor executor;

    static {
        final Integer corePoolSize = 8;
        final Integer maxPoolSize = 100;
        final Long idleKeepAliveTime = 10L;
        final TimeUnit keepAliveTimeUnit = TimeUnit.MILLISECONDS;
        BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
        executor = new ThreadPoolExecutor(corePoolSize,
                maxPoolSize,
                idleKeepAliveTime,
                keepAliveTimeUnit,
                queue,
                new ThreadPoolExecutor.DiscardPolicy());
    }

    private static final Map<String, Integer> failTimesMap = new ConcurrentHashMap<>();

    @Override
    public Object handleError(Message amqpMessage, org.springframework.messaging.Message<?> message, ListenerExecutionFailedException exception) throws Exception {

        Object payload = message.getPayload();

        if (payload instanceof Map) {
            try {
                Map<String, String> phoneAndCode = (Map<String, String>) payload;
                String sendTo = phoneAndCode.get(TelephoneMsgService.sendToKey);
                String code = phoneAndCode.get(TelephoneMsgService.VERIFY_CODE_KEY);
                Runnable task = new Runnable() {
                    @Override
                    public void run() {
                        try {
                            telephoneMsgService.sendVerifyCode(sendTo, code);
                            failTimesMap.remove(sendTo);
                        } catch (Exception ex) {
                            if (failTimesMap.containsKey(sendTo)) {
                                Integer failTime = failTimesMap.get(sendTo);
                                if (failTime < MAX_RETRY_TIMES) {
                                    failTimesMap.put(sendTo, ++failTime);
                                } else {
                                    logger.error("Send Verify code failed after tried 3 times: {}", ex.getStackTrace());
                                }
                            } else {
                                failTimesMap.put(sendTo, 1);
                                executor.submit(this);
                            }
                        }
                    }
                };
                executor.submit(task);
            } catch (ClassCastException ignore) {
                return null;
            }
        }
        return null;
    }

}
